package net.chengp.esload4j.source.mysql.handle;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.springframework.util.StringUtils;

import com.alibaba.fastjson.JSONArray;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;

import net.chengp.esload4j.config.ESLoad4jConfig.DBS;
import net.chengp.esload4j.config.ESLoad4jConfig.MysqlSourceConfig;
import net.chengp.esload4j.source.mysql.db.MySQLDaoImpl;
import net.chengp.esload4j.target.es.ESLoad;

/**
 * 
 * @author chengp
 * @email E-mail:chengp@chengp.net
 * @version 创建时间：2020年9月12日 下午3:23:28 
 *
 */
public abstract class AbstractHandler {
	
	protected ESLoad ESLoad;
	
	private MysqlSourceConfig mysqlSourceConfig;
	
	public AbstractHandler(ESLoad ESLoad, MysqlSourceConfig mysqlSourceConfig) {
		this.ESLoad = ESLoad;
		this.mysqlSourceConfig = mysqlSourceConfig;
	}

	abstract public void handle();
	
	protected String getIndexName(long tableId) {
		TableMapEventData tmed = MySQLEventHandlerContext.tableCache.get(tableId);
		if(tmed==null) {
			return null;
		}
		DBS db = MySQLEventHandlerContext.listenTable.get(tmed.getDatabase() +"."+ tmed.getTable());
		String index = db.getIndex();
		if (StringUtils.isEmpty(index)) {
			index = tmed.getDatabase() + "-" + tmed.getTable();
		}
		return index;
	}

	protected MySQLDaoImpl getMySQLDaoImpl() {
		return new MySQLDaoImpl(mysqlSourceConfig);
	}
	
	protected List<Map<String, Object>> getOperDatas(Long tableId, List<Serializable[]> srows) throws Exception {
		TableMapEventData tmed = MySQLEventHandlerContext.tableCache.get(tableId);
		if(tmed==null) {
			return null;
		}
		JSONArray rows = JSONArray.parseArray(JSONArray.toJSONString(srows));
		List<Map<String, Object>> tableColumns = getMySQLDaoImpl().getTableColumns(tmed);
		List<Map<String, Object>> datas = new ArrayList<Map<String, Object>>();
		for (int i = 0; i < rows.size(); i++) {
			JSONArray row = rows.getJSONArray(i);
			Map<String, Object> d = new HashMap<String, Object>();
			for (int j = 0; j < row.size(); j++) {
				Map<String, Object> c = tableColumns.get(j);
				Object v = row.get(j);
				d.put(String.valueOf(c.get("COLUMN_NAME")), v);
			}
			datas.add(d);
		}
		return datas;
	}
	
	protected List<Map<String, Object>> getUpdateDatas(Long tableId, List<Map.Entry<Serializable[], Serializable[]>> srows) throws Exception {
		TableMapEventData tmed = MySQLEventHandlerContext.tableCache.get(tableId);
		if(tmed==null) {
			return null;
		}
		JSONArray rows = JSONArray.parseArray(JSONArray.toJSONString(srows));
		List<Map<String, Object>> tableColumns = getMySQLDaoImpl().getTableColumns(tmed);
		List<Map<String, Object>> datas = new ArrayList<Map<String, Object>>();
		@SuppressWarnings("unchecked")
		Map<JSONArray, JSONArray> row = rows.getObject(0, Map.class);
		row.forEach((k,v)->{
			Map<String, Object> d1 = new HashMap<String, Object>();
			Map<String, Object> d2 = new HashMap<String, Object>();
			for (int j = 0; j < k.size(); j++) {
				Map<String, Object> c = tableColumns.get(j);
				Object v1 = k.get(j);
				Object v2 = v.get(j);
				d1.put(String.valueOf(c.get("COLUMN_NAME")), v1);
				d2.put(String.valueOf(c.get("COLUMN_NAME")), v2);
			}
			datas.add(d1);
			datas.add(d2);
		});
		return datas;
	}

}
